AWS再入門ブログリレー Amazon SQS編
こんにちは、もこ@札幌オフィスです。
当エントリは弊社コンサルティング部による『AWS 再入門ブログリレー 2020』の 5日目のエントリです。
このブログリレーの企画は、普段 AWS サービスについて最新のネタ・深い/細かいテーマを主に書き連ねてきたメンバーの手によって、 今一度初心に返って、基本的な部分を見つめ直してみよう、解説してみようというコンセプトが含まれています。
AWS をこれから学ぼう!という方にとっては文字通りの入門記事として、またすでにAWSを活用されている方にとっても AWSサービスの再発見や2020 年のサービスアップデートのキャッチアップの場となればと考えておりますので、ぜひ最後までお付合い頂ければ幸いです。
では、さっそくいってみましょう。5日目のテーマはAmazon SQSです。
Amazon SQSとは?
Amazon SQSとは「Amazon Simple Queue Service」の略で、その名の通りメッセージのキューを提供するマネージド型のサービスで、アプリケーション間で処理を分離する時に利用する場合、処理の橋渡し役となる安全かつスケーラビリティの高いキューです。
バッチ処理などでワーカープロセスを柔軟にスケールするような構成でSQSを利用すると、様々なユースケースで利用することが出来ます。
SQSはメッセージを取得後も(一部の例外を除き)明示的にキューからメッセージを削除しない限りキューに残り続けるため、処理が失敗してもメッセージを失うこと無くリトライする事が出来ます。
具体的なユースケース
実際にSQSをサービスに組み込む時に使われる、よくあるSQSのアーキテクチャ一例をご紹介します。
SQSで時間が掛かる処理をオフロードするパターン
バッチ処理や複雑で重い処理をHTTPで受け付ける場合などでSQSを利用する場合、下記のような構成を取ることが出来ます。
- 処理リクエストを投げる
- SQSに処理内容が書かれたメッセージを送信
- 202 Acceptedレスポンスを返却しておく
- SQSをポーリングする形でメッセージを取得して、処理をする
このような構成の場合、フロントとバックエンドの処理を別けることができ、処理を待たずにキューイングするような実装を簡単に作る事ができます。
SQSではキューに貯まっているメッセージ数からAutoScalingさせるように構成する事も出来るので、処理するワーカーノードを柔軟にスケーリングさせることも出来ます。
アプリケーション間の連携に使うパターン
マイクロサービスなどのアプリケーション間連携にSQSを使うケースもよくあります。
- 処理内容をSQSに送信
- SQSをポーリングする形でアプリケーション間の連携を行う
アプリケーション間の連携でSQSを使うことで、万が一処理先のマイクロサービスで障害が起きた場合でもメッセージはSQSに貯まり続けて、障害復帰後に処理を再開出来ます。
SQS標準キューは"ほぼ無制限の"APIコールをサポートしているので、スパイク的な大量のリクエストも捌いてくれます。
標準キューとFIFOの違い
SQSは「標準キュー」と「FIFOキュー」の2種類があります。
標準キューでは"ほぼ無制限"のAPIコールをサポートしている一方で、メッセージを重複して取得してしまう可能性があったり、メッセージの取得順が保証されていません。
「メッセージが登録された順に1度だけ」のような順序保証と重複排除が必要な場合はFIFOキューを使いましょう。
FIFOキューは順番を保証する代わりに1秒あたりの最大トランザクションは3000となっており、料金も標準キューが100万リクエストにつき0.4ドルに対して、FIFOは0.5ドルと、若干高くなっています。(2020年8月7日時点)
FIFOキューについては「【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)」のエントリーがとてもわかりやすく詳細に解説されているので、是非こちらも合わせてご覧下さい。
SQSをアプリケーションに組み込んでみる
ここまでSQSの基本的な仕組みとユースケースについてご紹介してきましたが、実際にアプリケーションに組み込んで挙動を確認してみましょう。
SQSにメッセージを登録するプロデューサー側
SQSではメッセージを登録するアプリケーションの事を「プロデューサー」と呼びます。
下記コードはSQSに対して「こんにちは」というメッセージを登録するサンプルコードです。
※実際には処理内容を記載して登録する形になります
const AWS = require("aws-sdk") const SQS = new AWS.SQS({ region: "ap-northeast-1" }) const QueueUrl = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/example" async function sendMessage(message) { try { const MessageBody = JSON.stringify({ message }) const result = await SQS.sendMessage({ MessageBody, QueueUrl }).promise() console.log(result) } catch (e) { console.error(e) } } sendMessage("こんにちは")
実行するとこんな感じのレスポンスが帰ってきます。
{ "ResponseMetadata": { "RequestId": "2245f4a0-f8b7-50a7-9e4b-a3060bda0671" }, "MD5OfMessageBody": "04366f504cabfb9b64545bd8f0a6012b", "MessageId": "72dc98e1-84f8-4e8a-bdb9-7c0f9cb62e57" }
マネージドコンソールを見るとキューにメッセージが貯まっていることを確認出来ます。
EC2などでメッセージをSQSから取得して処理するコンシューマー側
SQSではメッセージを取得して処理を行うアプリケーションを「コンシューマー」と呼びます。
プロデューサー側で登録したメッセージを取得して、処理をして、メッセージを削除する一連の流れのコードはこんな感じです。
const AWS = require("aws-sdk") const SQS = new AWS.SQS({ region: "ap-northeast-1" }) const QueueUrl = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/example" async function processingMessage() { try { const params = { QueueUrl, MaxNumberOfMessages: 10, // 最大取得メッセージ WaitTimeSeconds: 20 // ロングポーリング } const messages = await SQS.receiveMessage(params).promise() // メッセージを取得 for (message of messages.Messages) { const body = JSON.parse(message.Body) console.log(message) console.log(body.message) //メッセージの実態 //重い処理 await new Promise((resolve, reject) => { setTimeout(resolve, 1000) }) const params = { QueueUrl, ReceiptHandle: message.ReceiptHandle } const deleteResult = await SQS.deleteMessage(params).promise() console.log(deleteResult) } } catch (e) { console.error(e) } } processingMessage()
SQSでは一部の例外(Dead Letter Queueと保存期間切れ)を除き明示的にメッセージを削除しないとキューにメッセージが残り続けるので、 DeleteMessage
でメッセージを削除してあげる必要があります。
ReceiveMessage
で取得出来るデータはこんな感じで、 ReceiptHandle
を DeleteMessage
に渡してあげるとキューからメッセージを削除出来ます。
{ "ResponseMetadata": { "RequestId": "8bc818cd-89b5-5746-8429-4f6b7eb8ecd4" }, "Messages": [ { "MessageId": "0428c6c9-9193-45f6-a7fa-10ca0b6efaf7", "ReceiptHandle": "AQEBnNV3auHIuRy8Oh8Pdlr8GAEmzu1Gt9iZ1zEr2AIxOvSpeqCthwI9JtaH7jZyC4CJUUrdQR60b0G6MtkNBLR4sJ1HFAhYg04kDGTfCySB49NKFTXtIkK5SF4jcf/W71j8+jDy7HwbsRqO8PrmdJ8wZKzKIHsYnTNpT9NYOltOgIMP22JA+v+/fn64W1jfTUOKmHaISkZwEnSF++aULZH/FQCXZbN7P9FbeeqmlWOOaY4w0FLNnftEjxEplu+NNx9thQn062TsAINCTFNI7yx26KlBG+fAqYTK0g1Uzyk+ZGRdd562ylrqB/hRrNgnPRRYfDOCD0mJ5qXhllXIUdRV8011P+UWf2s10WjheBq9WjNEB06xCKm9/1ZuIZaMpW1CS+6IJy3qGWIXOMK0cCM6mg==", "MD5OfBody": "04366f504cabfb9b64545bd8f0a6012b", "Body": "{\"message\":\"こんにちは\"}" }, { "MessageId": "94124ef1-659f-48e4-a594-ea146e7e53cd", "ReceiptHandle": "AQEB3qoCIBtUecBjVLAtG4K/4lXVyoHInqj/8svoXZ5Oc3tcuxk6vc/GlAwb5gt/YtbtNAOHB2q4l9xSNFNgnTgtsKr9dxR+izHaJMXNLaXwPVCbQGkQAo01Jgj24qvHDXb3hz92EdFFiAryQHYMrIhV5ooo5un7pRygIvg4+Nn55KQkJMyrdoUdA2DthXARE0T5t6M2A8pbaYy0jG9uv0pOl34AQagdh/OqJ6qpdfEzsXx7r3gzjQ1Pg8GE1C2S+whMSdyau83O0irAfLbvPypPdUY4nGoqXyA4No/7wmvVt3n6b6XY05OIHCJRqeOqyIh2yMC8aTSLWQp+ZjF+YJFGh5igXt63+yPEPPkU0nQnLq3nKn94RTxFTMAxp21OKIytmBx6RYcwZeWe9TnU2s2kdw==", "MD5OfBody": "04366f504cabfb9b64545bd8f0a6012b", "Body": "{\"message\":\"こんにちは\"}" } ] }
LambdaでSQSのメッセージを取得して処理する方法
SQSをイベントソースにしてLambdaで処理を行うことが可能です。
通常は上記のようにAWS SDKを利用して ReceiveMessage
でメッセージ取得して、処理が完了した後に DeleteMessage
でキューからメッセージを削除する必要がありますが、SQSをイベントソースにしたLambdaの場合、Lambdaが正常終了するとメッセージが削除される挙動となります。
import { Context, SQSEvent, Callback } from 'aws-lambda'; export async function handler(event: SQSEvent, context: Context, callback: Callback) { try { for (const message of event.Records) { // キューの処理 console.log(message); await new Promise((resolve, reject) => { setTimeout(resolve, 1000); //重い処理 }); } // 正常終了でcallback callback(null, 'success'); } catch (e) { callback(e); } }
SQSをイベントソースにしたLambdaの詳細については、下記ブログにて解説されていますので是非合わせてご覧下さい。
ライフサイクルとSQSの設定
さて、ここまでユースケースやアプリケーションへの実装方法をご紹介してきましたが、SQSではその他にも「かゆいところに手が届くメッセージライフサイクル」機能が複数存在します。
SQSのライフサイクルついて再入門していきましょう。
メッセージのライフサイクル
まずはじめにメッセージのライフサイクルについておさらいです。
SQSのメッセージライフサイクルは大きく分けて、「遅延キュー、可視性タイムアウト、DLQ(Dead Letter Queue)、保存期間切れ」の4段階に別けることが出来ます。
・SQSにメッセージを送信する SendMessage
を利用した後、設定された「遅延キュー」の間はメッセージを取得出来ない状態になります。
・SQSは ReceiveMessage
を使ってメッセージを取得した時、「可視性タイムアウト」が発動して、同じメッセージが設定期間見えなくなります。
・設定した回数以上ReceiveMessage
されたが DeleteMessage
されずにまだキューにある状態の場合、DLQ(Dead Letter Queue)にメッセージが移動されます。
・キューの設定した保存期間を過ぎるとメッセージは削除されます。 保存期間を過ぎた場合DLQ(Dead Letter Queue)に入らないので、注意が必要です。
遅延キュー
遅延キューを使うことで、メッセージがSQSに登録されてから指定した時間メッセージが非表示になります。
遅延キューは「キューに入ってくる新しいメッセージ全てに適用される」ため、キューに登録されてからn分後にコンシューマーで処理させたい場合などに利用することが出来るかと思います。
可視性タイムアウト
可視性タイムアウトは ReceiveMessage
でメッセージを取得してから指定した時間の間、同じメッセージを取得出来なくなる、という物です。
SQSは一部の例外(Dead Letter Queueと保存期間切れ)を除き、明示的に DeleteMessage
でメッセージを削除しない限りキューから削除されないので、処理中に別のコンシューマーが ReceiveMessage
をして同じメッセージを受け取らないように可視性タイムアウトを設定する事で、多重実行の最小化が出来ます。(最小化出来るだけで、前述の通り標準キューでは複数回同じメッセージを受け取る可能性もあります。)
DLQ(Dead Letter Queue)
DLQ(Dead Letter Queue)は、メッセージを何らかの理由で処理出来なかった場合にメインのキューからDLQにメッセージを移動させる機能になります。
具体的には、指定した回数以上 ReceiveMessage
された場合にDLQに移動する形になります。
DLQを利用するメリットとして、例えば何らかの理由でコンシューマーが処理出来ないメッセージがキューに入ってしまった場合、DLQを利用しないとメッセージの保存期間が過ぎるまでキューに貯まり、 ReceiveMessage
で処理出来ないメッセージを引くたびに無駄が生じてしまいます。
DLQを利用することでこのようなメッセージを排除して、DLQに入ったらアラートを飛ばすようにしたり、処理出来なかった時用のコンシューマーを用意するなど、ケースに応じて柔軟に対応することが出来ます。
まとめ
以上、『AWS 再入門ブログリレー 2020』の 5日目のエントリ『Amazon SQS』編でした。 来週火曜日 (8/11) はたぬきの「Amazon S3」の予定です。お楽しみに!!
参考
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html